Kinesis Data Streamsに流れてくるイベントをPersonalizeのEventTrackerに投げてみる
Personalizeにはイベントを収集する機能があり、イベントデータが追加されることで、キャンペーンによるリアルタイムレコメンドに反映させることができます。また蓄積したイベントデータを使ったソリューションバージョン(レコメンドモデル)の作成も可能です。その際に使用するイベントはブラウザや外部システム等からKinesisに集められて、そこから必要な処理が施されるケースが多いかと思います。
今回は、Kinesis Data Streamsにイベントデータが送られてくるシステムを想定して、PersonalizeのEventTrackerへのイベントデータの送信を試してみました。EventTrackerによってPersonalizeに蓄積したイベントデータは後から参照することができないため、Kinesis Data Firehoseを使ったS3への保存も合わせて行いました。
構成図
今回試したシステムは次のような構成です。
スクリプト
Kinesisのストリームから取得したレコードからイベントデータを取り出し、PersonalizeのEventTrackerへ投げるだけのシンプルな処理です。ストリームの後続のLambda関数がこの処理を実行します。
import json import boto3 import base64 import os TRACKING_ID = os.environ['TRACKING_ID'] personalize_events = boto3.client('personalize-events') def handler(event, _): for record in event['Records']: parsed_event = json.loads( base64.b64decode(record['kinesis']['data'])) personalize_events.put_events( trackingId=TRACKING_ID, userId=str(parsed_event['userId']), sessionId=str(parsed_event['sessionId']), eventList=[{ 'sentAt': str(parsed_event['sentAt']), 'eventType': parsed_event['eventType'], 'properties': json.dumps({ 'itemId': str(parsed_event['itemId']) }) }] )
試してみる
PersonalizeでEventTrackerを作成し、Serverless Frameworkを使ってKinesis等のスタックをデプロイします。 その後、Kinesis Data Generatorを使って、Kinesisのストリームにデータを流して動きを確認してみます。 使用するスクリプトはGitHubで公開しています。
EventTrackerを作成する
まずはPersonalizeのEventTrackerを作成します。EventTrackerの作成にはデータセットグループとインタラクション用データセットが必要なので、先に作成します。 今の所、PersonalizeはCloudFormationが対応していないので、マネジメントコンソールで作業します。
まずはデータセットグループを作成します。
インタラクションタイプのデータセットも作成します。
データセットのスキーマはデフォルトのものを利用します。
データセットを作成すると、データをインポートする画面になりますが、今回は不要なのでキャンセルします。
ダッシュボード画面からEventTrackerを作成します。
名前を入力するだけで作成できます。
表示されたトラッキングIDはこの後使うため、控えておきます。
スタックを作成する
リポジトリをクローンしてきて、Serverless Frameworkを使ってデプロイします。デプロイの際には先ほど作成したEventTrackerのトラッキングIDを指定します。
git clone https://github.com/tandfy/sample-for-kinesis-to-personalize-event.git cd sample-for-kinesis-to-personalize-event yarn install TRACKING_ID={トラッキングID} sls deploy
しばらくすると、スタックの作成が完了します。
イベントデータを流す
Kinesis Data Generatorを使ってイベントデータを流します。まずはKinesis Data Generatorを使えるようにするために必要なスタックを作成します。
Create a Cognito User with CloudFormation
をクリックして、流れに沿って、スタックを作成します。
作成ができたら、スタックの出力にリンクが表示されているので、アクセスします。 リンク先はKinesis Data Generateで、URLにCognito Userに関する設定情報が入力されており、自動的に設定が施された状態になります。
スタック作成の中で設定したUsername
とPassword
を入力してログインします。
リージョンとストリームを選択します。
秒間あたりのレコード数は10件で試します。レコードの圧縮はしません。
Record template
に次のようなイベント用のテンプレートデータを入力します。
{ "sessionId": {{random.number(50)}}, "userId": {{random.number(1000)}}, "itemId": {{random.number(1000)}}, "eventType": "{{random.arrayElement( ["CLICK","PAGE_VIEW"] )}}", "sentAt": {{random.number( { "min":1585200000, "max":1585300000 } )}} }
Send data
を押すと、イベントデータがKinesisのストリームに送られます。
ある程度イベントデータが送られたら、送信を停止します。
送信されたイベントの確認
ClowdWatchからPersonalizeのEventTrackerのメトリクスを確認できます。今回はPutEventsRequests
を見てみます。
今回は各イベントを1件ずつPutしているため、リクエスト数=イベント数
となります。
200件のリクエストが成功していることを確認できました。
続いて、S3に保存されたイベントを確認してみます。 次のように日時ごとにイベントが保存されています。今回は短時間しかイベントを送っていないので、1つしかデータが保存されていません。
Kinesisに送信したイベントデータがJSON形式のため、S3に保存されたイベントデータは次のようなJSON Lines形式になっています。
さいごに
Kinesisのストリームに流れてきたイベントをPersonalizeに蓄積する流れを紹介しました。イベントをPutするためのLambda関数、S3にデータを保存するFirehoseというシンプルな構成ながら、Personalizeに蓄積したデータを使ってレコメンドを作成したり、S3に蓄積したデータを使って分析するなど幅広い活用が考えられます。